Profile image - Parsa Karami website

Parsa Karami`s Blog

Life runs on code.

Receiving buffered items with wait time intervals in RxNET

Profile image - Parsa Karami website
682 days ago on January 09, 2023   |   Parsa Karami on Rx   |   1162
Receiving buffered items with wait time intervals in RxNET - Parsa Karami website
     Receiving buffered items with wait time intervals in RxNET     

Buffering is one of the valuable methods of the Rx.NET library that allows receiving emitted values of an observable sequence by a time frame or by count. In this post, I will talk about how we can receive buffered items of an observable with a wait time interval. 

Reactive programming is a widely used technique in software development, mainly when you deal with desktop and mobile applications. Personally, I deal with it most of the time in the office when I work on one of my UI-based projects. The Reactive Extensions (Rx) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators. It is a powerful library that enables us to employ reactive programming in the .NET. In my GitHub, I talked about Rx.NET in more detail. Moreover, I provided some samples of using RX in C# and .NET. You can see the repository in this link

In this post, I want to talk about how we can receive buffered items of an observable with a wait time interval. It is what I encountered in one of my projects. I received many messages on an observable pipeline. And I used the buffer function to take a group of them by count. But, I needed to validate them and execute an action on each. This operation takes time, but the messages are coming fast in large numbers. Hence, I need time between each batch of data to finish my process. To solve this puzzle, I wrote the StepInterval extension method. So, let's start to see how this extension method works. 

Imagine the following observable pipeline that sends nine notification messages one after another. To simulate, I used an Observable.Rang(1,10) in C#, and you can see this below:

     Observable.Range(1, 9)
     .Subscribe(item =>
     {
         // A single message item
     }).DisposeWith(disposable);

The following marble diagram also indicates the observable pipeline. 

Receiving buffered items with wait time intervals in RxNET

In order to receive the messages in batches and process the messages, I used the Buffer function of the reactive extension library. By using Buffer you can take the items from an observable execution pipeline in a time frame or by count. Therefore, you will receive a group of messages after the subscription. 

     Observable.Range(1, 9)
     .Buffer(3)
     .Subscribe(items =>
     {
         // A group of message items that includes three items
     }).DisposeWith(disposable);

This figure also shows the execution pipeline using a marble diagram.

 Receiving buffered items with wait time intervals in RxNET

By looking at the buffered items, there is no wait interval after receiving each buffered message. As I mentioned, I needed time to finish the process and receive the next one. You might think the Delay function can help us, but it can't. The Delay function just added a delay in receiving all messages, not adding wait intervals between them. We don't have this function in the Rx library by default. But nothing prevents us from creating an extension method to do so. 

To solve the problem, I wrote an extension method that Concat an empty observable with the source and emits the source values with a delay between each emission. It can be used to make delays between the messages on an execution pipeline of an observable. Here is the StepInterval extension method code.

public static IObservable StepInterval(this IObservable source, TimeSpan minDelay)
        {
            return source.Select(x =>
                Observable.Empty()
                    .Delay(minDelay)
                    .StartWith(x))
                .Concat();
        }

Finally, I used the StepInterval in my pipeline to make delays between the receiving buffered items. To test the function, I created a WPF application to show values and the receiving time of each buffered item on the UI. Hence, I have to observe my pipeline on the UI thread. 

Observable.Range(1, 9)
.Buffer(3)
.StepInterval(TimeSpan.FromSeconds(1))
.ObserveOn(RxApp.MainThreadScheduler)
.Subscribe(items=>
{
        var values = string.Join(", ", items);
        var receiveTime = DateTime.Now.ToString("hh:mm:ss");
        ValueLabel.Content += $"[{values}]  -  Receive time: {receiveTime}";
}).DisposeWith(disposable);

The following marble diagram shows the final results and how the StepInterval function solves my problem. 

 Receiving buffered items with wait time intervals in RxNET

You can also see the source code of this sample on my GitHub: https://github.com/Parsakarami/RxSamples/tree/master/Samples